package com.smile.cloud.admin.rocketmq;

import com.smile.cloud.admin.rocketmq.core.DedupConfig;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;

/**
 * @author LGC
 */
@Slf4j
@Data
@Configuration
@ConfigurationProperties(prefix = "rocketmq.consumer")
public class MQConsumerConfigure {

    private String groupName;
    private String nameServerAddr;
    private String topics;
    private Integer consumeThreadMin;
    private Integer consumeThreadMax;
    private Integer consumeMessageBatchMaxSize;

    @Autowired
    private MQConsumeMsgListenerProcessor consumeMsgListenerProcessor;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * mq default 消费者配置
     *
     * @return
     */
//    @Bean
////    @ConditionalOnProperty(prefix = "rocketmq.consumer", value = "isOnOff", havingValue = "on")
//    public DefaultMQPushConsumer defaultConsumer() {
//        log.info("defaultConsumer 正在创建---------------------------------------");
//        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
//        consumer.setNamesrvAddr(nameServerAddr);
//        consumer.setConsumeThreadMin(consumeThreadMin);
//        consumer.setConsumeThreadMax(consumeThreadMax);
//        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
//        // 设置监听
//        consumer.registerMessageListener(consumeMsgListenerProcessor);
//
//        /**
//         * 设置consumer第一次启动是从队列头部开始还是队列尾部开始
//         * 如果不是第一次启动，那么按照上次消费的位置继续消费
//         */
//        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//        /**
//         * 设置消费模型，集群还是广播，默认为集群
//         */
//        consumer.setMessageModel(MessageModel.CLUSTERING);
//
//        try {
//            // 设置该消费者订阅的主题和tag，如果订阅该主题下的所有tag，则使用*,
//            String[] topicArr = topics.split(";");
//            for (String tag : topicArr) {
//                String[] tagArr = tag.split("~");
//                consumer.subscribe(tagArr[0], tagArr[1]);
//            }
//            consumer.start();
//            log.info("consumer 创建成功 groupName={}, topics={}, nameServerAddr={}", groupName, topics, nameServerAddr);
//        } catch (MQClientException e) {
//            log.error("consumer 创建失败!");
//        }
//        return consumer;
//    }

    /**
     * mq default 消费者配置
     *
     * @return
     */
    @Bean
//    @ConditionalOnProperty(prefix = "rocketmq.consumer", value = "isOnOff", havingValue = "on")
    public DefaultMQPushConsumer defaultConsumer() {
        log.info("defaultConsumer 正在创建---------------------------------------");
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(nameServerAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        try {
            consumer.registerMessageListener(messageListener());
            // 设置该消费者订阅的主题和tag，如果订阅该主题下的所有tag，则使用*,
            String[] topicArr = topics.split(";");
            for (String tag : topicArr) {
                String[] tagArr = tag.split("~");
                consumer.subscribe(tagArr[0], tagArr[1]);
            }
            log.info("consumer 创建成功 groupName={}, topics={}, nameServerAddr={}", groupName, topics, nameServerAddr);
            consumer.start();
        } catch (MQClientException e) {
            log.error("consumer 创建失败!");
        }
        return consumer;
    }

    @Bean
    public DedupConfig dedupConfig() {
        return DedupConfig.enableDedupConsumeConfig(groupName, stringRedisTemplate);
    }

    @Bean
    @ConditionalOnClass(value = DedupConfig.class)
    public SampleListener messageListener() {
        return new SampleListener(dedupConfig());
    }
}